本文介绍了Apache Flink:运行多个作业时的性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在大量 Flink SQL 查询(以下 100 个)的情况下,Flink 命令行客户端在 Yarn 集群上失败并显示JobManager 在 600000 毫秒内没有响应",即该作业从未在集群上启动.
With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster, i.e. the job is never started on the cluster.
- JobManager 日志在最后一个 TaskManager 启动后没有任何内容,除了DEBUG 记录作业 ID 为 5cd95f89ed7a66ec44f2d19eca0592f7 不在 JobManager 中找到",表明它可能卡住了(创建执行图?).
- 与本地独立java程序相同(最初高 CPU)
- 注意:structStream 中的每一行包含 515列(许多最终为空)包括具有原始数据的列信息.
- 在 YARN 集群中,我们为 TaskManager 指定了 18GB,18GB对于 JobManager,每个插槽 5 个插槽,并行度为 725(分区在我们的 Kafka 源代码中).
select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp,
EventTimestamp, RawMsg, Source
from structStream
where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType'
and Outcome='Success'
group by tumble(proctime, INTERVAL '1' SECOND), Environment,
CollectedTimestamp, EventTimestamp, RawMsg, Source
代码
public static void main(String[] args) throws Exception {
FileSystems.newFileSystem(KafkaReadingStreamingJob.class
.getResource(WHITELIST_CSV).toURI(), new HashMap<>());
final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);
final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
tableEnv.registerDataStream("structStream", structStream);
tableEnv.scan("structStream").printSchema();
for (int i = 0; i < 100; i++) {
for (String query : Queries.sample) {
// Queries.sample has one query that is above.
Table selectQuery = tableEnv.sqlQuery(query);
DataStream<Row> selectQueryStream =
tableEnv.toAppendStream(selectQuery, Row.class);
selectQueryStream.print();
}
}
// execute program
streamingEnvironment.execute("Kafka Streaming SQL");
}
private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
Properties properties = getKafkaProperties();
// TestDeserializer deserializes the JSON to a ROW of string columns (515)
// and also adds a column for the raw message.
FlinkKafkaConsumer011 consumer = new
FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties);
DataStream<Row> stream = environment.addSource(consumer);
return stream;
}
private static RowTypeInfo getRowTypeInfo() throws Exception {
// This has 515 fields.
List<String> fieldNames = DDIManager.getDDIFieldNames();
fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
fieldNames.add("proctime");
// Fill typeInformationArray with StringType to all but the last field which is of type Time
.....
return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}
private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
env.setParallelism(725);
return env;
}
private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
Properties properties = getKafkaProperties();
// TestDeserializer deserializes the JSON to a ROW of string columns (515)
// and also adds a column for the raw message.
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties);
DataStream<Row> stream = environment.addSource(consumer);
return stream;
}
private static RowTypeInfo getRowTypeInfo() throws Exception {
// This has 515 fields.
List<String> fieldNames = DDIManager.getDDIFieldNames();
fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
fieldNames.add("proctime");
// Fill typeInformationArray with StringType to all but the last field which is of type Time
.....
return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}
private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
env.setParallelism(725);
return env;
}
推荐答案
在我看来,JobManager 似乎因同时运行的作业过多而过载.我建议将作业分配给更多的 JobManagers/Flink 集群.
This looks to me as if the JobManager is overloaded with too many concurrently running jobs. I'd suggest to distribute the jobs to more JobManagers / Flink clusters.
这篇关于Apache Flink:运行多个作业时的性能问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!