问题描述
我正在着手一个从 AWS Kinesis 读取数据的 Beam 项目,所以我有一个简单的 DoFn,它接受 KinesisRecord 并记录内容.我想编写一个单元测试来运行这个 DoFn 并证明它有效.不过,事实证明,使用 KinesisRecord 进行单元测试具有挑战性.
I’m getting started on a Beam project that reads from AWS Kinesis, so I have a simple DoFn that accepts a KinesisRecord and logs the contents. I want to write a unit test to run this DoFn and prove that it works. Unit testing with a KinesisRecord has proven to be challenging, though.
当我尝试仅使用 Create.of(testKinesisRecord)
时出现此错误:
I get this error when I try to just use Create.of(testKinesisRecord)
:
java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema().
我曾尝试按照错误提示使用withCoder"显式提供 KinesisRecordCoder,但它是一个私有类.也许还有另一种方法可以对 DoFn 进行单元测试?
I have tried providing the KinesisRecordCoder explicitly using "withCoder" as the error suggests, but it’s a private class. Perhaps there's another way to unit test a DoFn?
测试代码:
public class MyProjectTests {
@Rule
public TestPipeline p = TestPipeline.create();
@Test
public void testPoC() {
var testKinesisRecord = new KinesisRecord(
ByteBuffer.wrap("SomeData".getBytes()),
"seq01",
12,
"pKey",
Instant.now().minus(Duration.standardHours(4)),
Instant.now(),
"MyStream",
"shard-001"
);
PCollection<Void> output =
p.apply(Create.of(testKinesisRecord))
.apply(ParDo.of(new MyProject.PrintRecordFn()));
var result = p.run();
result.waitUntilFinish();
result.metrics().allMetrics().getCounters().forEach(longMetricResult -> {
Assertions.assertEquals(1, longMetricResult.getCommitted().intValue());
});
}
}
DoFn 代码:
static class PrintRecordFn extends DoFn<KinesisRecord, Void> {
private static final Logger LOG = LoggerFactory.getLogger(PrintRecordFn.class);
private final Counter items = Metrics.counter(PrintRecordFn.class, "itemsProcessed");
@ProcessElement
public void processElement(@Element KinesisRecord element) {
items.inc();
LOG.info("Stream: `{}` Shard: `{}` Arrived at `{}`\nData: {}",
element.getStreamName(),
element.getShardId(),
element.getApproximateArrivalTimestamp(),
element.getDataAsBytes());
}
}
推荐答案
KinesisRecordCoder
应该用于内部用途,因此将其设为包私有.同时,您可以提供自定义的 AWSClientsProvider
并使用它来生成测试数据.例如,请查看 KinesisMockReadTest 和自定义 提供者
KinesisRecordCoder
is supposed to be used for internal purposes, so it is made package private. In the same time, you can provide custom AWSClientsProvider
and use it to generate test data. As an example, please, take a look on KinesisMockReadTest and custom Provider
这篇关于如何对基于 KinesisRecord 的 DoFn 进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!