本文介绍了Flink不散发要存储在Cassandra中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下POJO课,

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "testKey", name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;
}

Mapper代码是

DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));

 env.execute();
 
 CassandraSink.addSink(sideOutput)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();

没有.getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));也无法正常工作.

sideOutput没有发出要存储在Cassandra中的值.知道我在哪里做错了吗?

The sideOutput is not emitting value to store in Cassandra. any idea where I am doing wrong?

推荐答案

我会说,应该在构建管道之后(即在CassandraSink之后)调用env.execute();,并且会摆脱侧面输出.这样的事情应该起作用:

I would say, env.execute(); should be called after the pipeline is build, i.e. after the CassandraSink and would get rid of side output. Somethink like this should work:

DataStream<Reading> ds = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        });
 
 CassandraSink.addSink(ds)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();


 env.execute();

这篇关于Flink不散发要存储在Cassandra中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 19:34