我正在将Apache Beam SDK与Flink SDK进行流处理进行比较,以确定使用Beam作为附加框架的成本/优势。

我有一个非常简单的设置,其中从Kafka源读取数据流,并由运行Flink的节点群集并行处理数据流。

根据我对这些SDK如何工作的理解,逐个窗口处理数据流的最简单方法是:

  • 使用Apache Beam(在Flink上运行):

    1.1。创建管道对象。

    1.2。创建一个Kafka记录的PCollection。

    1.3。应用开窗功能。

    1.4。通过窗口将管道转换为键。

    1.5。按键(窗口)对记录进行分组。

    1.6。将所需的任何功能应用于窗口记录。
  • 使用Flink SDK

    2.1。从Kafka源创建数据流。

    2.2。通过提供密钥功能将其转换为密钥流。

    2.3。应用开窗功能。

    2.4。将所需的任何功能应用于窗口记录。

  • 虽然Flink解决方案在编程上显得更为简洁,但以我的经验来看,它在处理大量数据时效率较低。我只能想象到开销是由键提取功能引入的,因为Beam不需要此步骤。

    我的问题是:我是否比较喜欢?这些过程不相等吗?有什么能解释Beam方法更有效的原因,因为它使用Flink作为运行程序(并且所有其他条件都相同)?

    这是使用Beam SDK的代码
        PipelineOptions options = PipelineOptionsFactory.create();
    
        //Run with Flink
        FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
        flinkPipelineOptions.setRunner(FlinkRunner.class);
        flinkPipelineOptions.setStreaming(true);
        flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime
    
        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(flinkPipelineOptions);
    
        // Create a PCollection of Kafka records
        PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
                .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
                .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
                .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));
    
        //Apply Windowing Function
        PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
    
        //Transform the pipeline to key by window
        PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
                windowedKafkaCollection.apply(
                        ParDo.of(
                                new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                    @ProcessElement
                                    public void processElement(ProcessContext context, IntervalWindow window) {
                                        context.output(KV.of(window, context.element()));
                                    }
                                }));
        //Group records by key (window)
        PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
                .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());
    
        //Process windowed data
        PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
                .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
    
        // Run the pipeline.
        p.run().waitUntilFinish();
    

    这是使用Flink SDK的代码
    // Create a Streaming Execution Environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setParallelism(6);
    
    //Connect to Kafka
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
    properties.setProperty("group.id", CONSUMER_GROUP);
    
    DataStream<ObjectNode> stream = env
                .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));
    
    //Key by id
    stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())
    
            //Set the windowing function.
            .timeWindow(Time.seconds(5L), Time.seconds(1L))
    
            //Process Windowed Data
            .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));
    
    // execute program
    env.execute("Using Flink SDK");
    

    非常感谢您的任何见解。

    编辑

    我认为我应该添加一些可能相关的指标。

    网络接收字节

    Flink SDK
  • taskmanager.2
  • 2,644,786,446
  • taskmanager.3
  • 2,645,765,232
  • taskmanager.1
  • 2,827,676,598
  • taskmanager.6
  • 2,422,309,148
  • taskmanager.4
  • 2,428,570,491
  • taskmanager.5
  • 2,431,368,644

  • 光束
  • taskmanager.2
  • 4,092,154,160
  • taskmanager.3
  • 4,435,132,862
  • taskmanager.1
  • 4,766,399,314
  • taskmanager.6
  • 4,425,190,393
  • taskmanager.4
  • 4,096,576,110
  • taskmanager.5
  • 4,092,849,114

  • CPU使用率(最大值)

    Flink SDK
  • taskmanager.2
  • 93.00%
  • taskmanager.3
  • 92.00%
  • taskmanager.1
  • 91.00%
  • taskmanager.6
  • 90.00%
  • taskmanager.4
  • 90.00%
  • taskmanager.5
  • 92.00%

  • 光束
  • taskmanager.2
  • 52.0%
  • taskmanager.3
  • 71.0%
  • taskmanager.1
  • 72.0%
  • taskmanager.6
  • 40.0%
  • taskmanager.4
  • 56.0%
  • taskmanager.5
  • 26.0%

  • Beam似乎使用了更多的网络,而Flink使用了更多的CPU。这是否表明Beam正在以一种更有效的方式并行化处理?

    编辑2号

    我非常确定PueCalculatorFn类是等效的,但是我将在此处共享代码以查看两个进程之间是否有任何明显的差异。

    光束
    public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
    private transient List<IKafkaConsumption> realEnergyRecords;
    private transient List<IKafkaConsumption> itEnergyRecords;
    
    @ProcessElement
    public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
        KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
        Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
        Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
        Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();
    
        //Calculate Pue
        IPueResult result = calculatePue(element.getKey(), records);
    
        //Create IntervalWindowResult object to return
        DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
        IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
                formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);
    
        //Return Pue keyed by Window
        c.output(KV.of(intervalWindowResult, result));
    }
    
    private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
        //Define accumulators to gather readings
        final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
        final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    
        //Declare variable to store the result
        BigDecimal pue = BigDecimal.ZERO;
    
        //Initialise transient lists
        realEnergyRecords = new ArrayList<>();
        itEnergyRecords = new ArrayList<>();
    
        //Transform the results into a stream
        Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);
    
        //Iterate through each reading and add to the increment count
        streamOfRecords
                .map(record -> {
                    byte[] valueBytes = record.getKV().getValue();
                    assert valueBytes != null;
                    String valueString = new String(valueBytes);
                    assert !valueString.isEmpty();
                    return KV.of(record, valueString);
                }).map(kv -> {
            Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
            KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
            return KV.of(kv.getKey(), consumption);
    
        }).forEach(consumptionRecord -> {
                    switch (consumptionRecord.getKey().getTopic()) {
                        case REAL_ENERGY_TOPIC:
                            totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                            realEnergyRecords.add(consumptionRecord.getValue());
                            break;
                        case IT_ENERGY_TOPIC:
                            totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                            itEnergyRecords.add(consumptionRecord.getValue());
                            break;
                    }
                }
        );
    
        assert totalRealIncrement.doubleValue() > 0.0;
        assert totalItIncrement.doubleValue() > 0.0;
    
        //Beware of division by zero
        if (totalItIncrement.doubleValue() != 0.0) {
            //Calculate PUE
            pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
        }
    
        //Create a PueResult object to return
        IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
        return new PueResult(intervalWindow, pue.stripTrailingZeros());
    }
    
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        RecordSenderFactory.closeSender();
        WindowSenderFactory.closeSender();
    }
    }
    

    Flink
    public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
    private transient List<KafkaConsumption> realEnergyRecords;
    private transient List<KafkaConsumption> itEnergyRecords;
    
    @Override
    public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
        Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
        Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
        BigDecimal pue = calculatePue(iterable);
    
        //Create IntervalWindowResult object to return
        DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
        IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
                formatter.format(windowEnd), realEnergyRecords
                .stream()
                .map(e -> (IKafkaConsumption) e)
                .collect(Collectors.toList()), itEnergyRecords
                .stream()
                .map(e -> (IKafkaConsumption) e)
                .collect(Collectors.toList()));
    
    
        //Create PueResult object to return
        IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());
    
        //Collect result
        collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));
    
    }
    
    protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
        //Define accumulators to gather readings
        final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
        final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    
        //Declare variable to store the result
        BigDecimal pue = BigDecimal.ZERO;
    
        //Initialise transient lists
        realEnergyRecords = new ArrayList<>();
        itEnergyRecords = new ArrayList<>();
    
        //Iterate through each reading and add to the increment count
        StreamSupport.stream(iterable.spliterator(), false)
                .forEach(object -> {
                    switch (object.get("topic").textValue()) {
                        case REAL_ENERGY_TOPIC:
                            totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                            realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                            break;
                        case IT_ENERGY_TOPIC:
                            totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                            itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                            break;
                    }
    
                });
    
        assert totalRealIncrement.doubleValue() > 0.0;
        assert totalItIncrement.doubleValue() > 0.0;
    
        //Beware of division by zero
        if (totalItIncrement.doubleValue() != 0.0) {
            //Calculate PUE
            pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
        }
        return pue;
    }
    
    }
    

    这是Beam示例中使用的自定义解串器。

    KafkaConsumptionDeserializer
    public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {
    
    public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
        if(jsonElement == null) {
            return null;
        } else {
            JsonObject jsonObject = jsonElement.getAsJsonObject();
            JsonElement id = jsonObject.get("id");
            JsonElement energyConsumed = jsonObject.get("energyConsumed");
            Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
            Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
            JsonElement topic = jsonObject.get("topic");
            Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
            return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
        }
      }
    
    }
    

    最佳答案

    不确定为什么您编写的Beam管道更快,但是从语义上讲它与Flink作业不同。类似于Flink中的窗口工作原理,在Beam中分配窗口后,以下所有操作都会自动考虑到窗口。您不需要按窗口分组。

    您的Beam管道定义可以简化如下:

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);
    
    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...
    
    //Apply Windowing Function
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
     Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
    
    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
        .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
    
    // Run the pipeline.
    p.run().waitUntilFinish();
    

    至于性能,它取决于许多因素,但请记住,Beam是Flink之上的抽象层。一般来说,如果您看到Beam on Flink的性能有所提高,我会感到惊讶。

    编辑:只是为了进一步说明,您不对Beam管道中的JSON“id”字段进行分组,而是在Flink代码段中进行分组。

    08-25 08:49