本文介绍了Flink不散发要存储在Cassandra中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有以下POJO课,
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
@Table(keyspace = "testKey", name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
}
Mapper代码是
DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));
env.execute();
CassandraSink.addSink(sideOutput)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
没有.getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));
也无法正常工作.
sideOutput
没有发出要存储在Cassandra中的值.知道我在哪里做错了吗?
The sideOutput
is not emitting value to store in Cassandra. any idea where I am doing wrong?
推荐答案
我会说,应该在构建管道之后(即在CassandraSink
之后)调用env.execute();
,并且会摆脱侧面输出.这样的事情应该起作用:
I would say, env.execute();
should be called after the pipeline is build, i.e. after the CassandraSink
and would get rid of side output. Somethink like this should work:
DataStream<Reading> ds = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
CassandraSink.addSink(ds)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute();
这篇关于Flink不散发要存储在Cassandra中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!