问题描述
我正在阅读SingleOutputStreamOperator#returns
的源代码,它的javadoc是:
I am reading the source code of SingleOutputStreamOperator#returns
, its javadoc is:
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Use this method the following way:
* <pre>{@code
* DataStream<Tuple2<String, Double>> result =
* stream.flatMap(new FunctionWithNonInferrableReturnType())
* .returns(new TypeHint<Tuple2<String, Double>>(){});
* }</pre>
*
* @param typeHint The type hint for the returned data type.
* @return This operator with the type information corresponding to the given type hint.
*/
它提到了FunctionWithNonInferrableReturnType
来说明returns方法的必要性,但是我无法编写这样一个NonInferrableReturnType
的类.你能帮忙写一个简单的吗?谢谢!
It mentions FunctionWithNonInferrableReturnType
to show case the necessity of returns method, but I am unable to write such a class that is NonInferrableReturnType
. Could you please help write a simple one? Thanks!
推荐答案
当文档说 NonInferrableReturnType
时,这意味着我们可以使用类型变量 ,或您喜欢的任何其他字母.因此,您可以创建一个返回
T
的 MapFunction
.但是你必须使用 .returns(TypeInformation.of(String.class)
例如,如果你的目标是返回一个 String
.
When the docs says NonInferrableReturnType
it means that we can use the type variable <T>
, or any other letter that you prefer. So you can create a MapFunction
that return a T
. But then you have to use .returns(TypeInformation.of(String.class)
for example, if your goal is to return a String
.
public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
@Override
public T map(AbstractDataModel value) throws Exception {
return (T) value.getValue();
}
}
这里我使用的是您上一个问题的类 创建具有超类型的 MapFunction 时编译失败.没有 .returns(TypeInformation.of(String.class))
的相同代码编译但抛出运行时异常:
Here I am using the classes of your last question Compiling fails when creating MapFunction with super type . The same code without .returns(TypeInformation.of(String.class))
compiles but throw the runtime exception:
由于类型擦除而无法自动确定.你可以通过使用returns(...)方法给出类型信息提示转换调用的结果,或者让你的函数实现ResultTypeQueryable"接口.
public class NonInferrableReturnTypeStreamJob {
private final List<AbstractDataModel> abstractDataModelList;
private final ValenciaSinkFunction sink;
public NonInferrableReturnTypeStreamJob() {
this.abstractDataModelList = new ArrayList<AbstractDataModel>();
this.abstractDataModelList.add(new ConcreteModel("a", "1"));
this.abstractDataModelList.add(new ConcreteModel("a", "2"));
this.sink = new ValenciaSinkFunction();
}
public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
this.abstractDataModelList = abstractDataModelList;
this.sink = sink;
}
public static void main(String[] args) throws Exception {
NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
concreteModelTest.execute();
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(this.abstractDataModelList)
.map(new MyMapFunctionNonInferrableReturnType())
.returns(TypeInformation.of(String.class))
.addSink(sink);
env.execute();
}
}
如果您愿意,这里是此示例的集成测试:
In case you wish, here is the integration test for this example:
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;
public class NonInferrableReturnTypeStreamJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster;
private final int minAvailableProcessors = 4;
private final boolean runInParallel;
public NonInferrableReturnTypeStreamJobTest() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
this.runInParallel = availableProcessors >= minAvailableProcessors;
if (this.runInParallel) {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(minAvailableProcessors)
.setNumberTaskManagers(1)
.build());
}
}
@Test
public void execute() throws Exception {
List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
abstractDataModelList.add(new ConcreteModel("a", "1"));
abstractDataModelList.add(new ConcreteModel("a", "2"));
ValenciaSinkFunction.values.clear();
NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
streamJob.execute();
List<String> results = ValenciaSinkFunction.values;
assertEquals(2, results.size());
assertTrue(results.containsAll(Arrays.asList("1", "2")));
}
}
这篇关于SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的javadoc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!