对我来说,这种新的编程范例非常新。我想用给定类中定义的.map()
替换DistributedFunction
中的匿名函数。但是我不确定如何创建新功能。
我有以下管道:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(e -> {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return kafkaMessage;
})
.map(m -> m.getData())
.drainTo(Sinks.logger());
根据一些Jet示例,我得出以下结论:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(KafkaHelper::decodeKafkaMessage)
.map(m -> m.getData())
.drainTo(Sinks.logger());
KafkaHelper类:
public final class KafkaHelper implements Serializable {
private static final long serialVersionUID = -3556269069192202060L;
public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
return kafkaMessage;
}
}
这种方法是否遵循规范/要求将
DistributedFunction
传递给.map()
?如果是,为什么?如果没有,我应该怎么做? 最佳答案
是的,在两个示例中,您都在创建DistributedFunction
实例并将其传递给map()
。 Java 8有一个规则,根据该规则,第一个示例中的lambda函数和第二个示例中的方法引用用于创建DistributedFunction
的综合子类型,该子类型使用您提供的代码实现其Single Abstract Method(“ SAM”)。
您的KafkaHelper
不必是Serializable
,因为您无需实例化它。您还可以将静态方法decodeKafkaMessage
放在任何其他类中,因为它与该类实例无关。