对我来说,这种新的编程范例非常新。我想用给定类中定义的.map()替换DistributedFunction中的匿名函数。但是我不确定如何创建新功能。

我有以下管道:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(e -> {
    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
    KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {
       kafkaMessage.setData(new String(encodedData, "utf-8"));
    } catch (Exception e1) {
       // TODO Auto-generated catch block
       e1.printStackTrace();
    }

    return kafkaMessage;
  })
 .map(m -> m.getData())
 .drainTo(Sinks.logger());


根据一些Jet示例,我得出以下结论:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(KafkaHelper::decodeKafkaMessage)
 .map(m -> m.getData())
 .drainTo(Sinks.logger());


KafkaHelper类:

public final class KafkaHelper implements Serializable {

    private static final long serialVersionUID = -3556269069192202060L;

    public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {

        Gson gson = new Gson();

        KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);

        byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

        try {
            kafkaMessage.setData(new String(encodedData, "utf-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }

         return kafkaMessage;
    }

}


这种方法是否遵循规范/要求将DistributedFunction传递给.map()?如果是,为什么?如果没有,我应该怎么做?

最佳答案

是的,在两个示例中,您都在创建DistributedFunction实例并将其传递给map()。 Java 8有一个规则,根据该规则,第一个示例中的lambda函数和第二个示例中的方法引用用于创建DistributedFunction的综合子类型,该子类型使用您提供的代码实现其Single Abstract Method(“ SAM”)。

您的KafkaHelper不必是Serializable,因为您无需实例化它。您还可以将静态方法decodeKafkaMessage放在任何其他类中,因为它与该类实例无关。

10-06 13:47