问题描述
我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零.
I'm using a custom sink in structured stream (spark 2.2.0) and noticed that spark produces incorrect metrics for number of input rows - it's always zero.
我的流构建:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(s3Path.toString())
.as(Encoders.bean(TestRecord.class))
.flatMap(
((FlatMapFunction<TestRecord, TestOutputRecord>) (u) -> {
List<TestOutputRecord> list = new ArrayList<>();
try {
TestOutputRecord result = transformer.convert(u);
list.add(result);
} catch (Throwable t) {
System.err.println("Failed to convert a record");
t.printStackTrace();
}
return list.iterator();
}),
Encoders.bean(TestOutputRecord.class))
.map(new DataReinforcementMapFunction<>(), Encoders.bean(TestOutputRecord.clazz))
.writeStream()
.trigger(Trigger.ProcessingTime(WRITE_FREQUENCY, TimeUnit.SECONDS))
.format(MY_WRITER_FORMAT)
.outputMode(OutputMode.Append())
.queryName("custom-sink-stream")
.start();
writeStream.processAllAvailable();
writeStream.stop();
日志:
Streaming query made progress: {
"id" : "a8a7fbc2-0f06-4197-a99a-114abae24964",
"runId" : "bebc8a0c-d3b2-4fd6-8710-78223a88edc7",
"name" : "custom-sink-stream",
"timestamp" : "2018-01-25T18:39:52.949Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 781,
"triggerExecution" : 781
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[s3n://test-bucket/test]",
"startOffset" : {
"logOffset" : 0
},
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "com.mycompany.spark.MySink@f82a99"
}
}
我是否必须在自定义接收器中填充任何指标才能跟踪进度?或者,当它从 s3 存储桶读取时,它可能是 FileStreamSource 中的问题?
Do I have to populate any metrics in my custom sink to be able to track progress? Or could it be a problem in FileStreamSource when it reads from s3 bucket?
推荐答案
问题与在我的自定义接收器中使用 dataset.rdd
相关,该接收器创建了一个新计划,因此 StreamExecution 不知道因此无法获取指标.
The problem was related to using dataset.rdd
in my custom sink that creates a new plan so that StreamExecution doesn't know about it and therefore is not able to get metrics.
用 data.queryExecution.toRdd.mapPartitions
替换 data.rdd.mapPartitions
解决了这个问题.
Replacing data.rdd.mapPartitions
with data.queryExecution.toRdd.mapPartitions
fixes the issue.
这篇关于带有自定义接收器的 spark 结构化流中的输入行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!