本文介绍了SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的javadoc的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读SingleOutputStreamOperator#returns的源代码,它的javadoc是:

I am reading the source code of SingleOutputStreamOperator#returns, its javadoc is:

/**
 * Adds a type information hint about the return type of this operator. This method
 * can be used in cases where Flink cannot determine automatically what the produced
 * type of a function is. That can be the case if the function uses generic type variables
 * in the return type that cannot be inferred from the input type.
 *
 * <p>Use this method the following way:
 * <pre>{@code
 *     DataStream<Tuple2<String, Double>> result =
 *         stream.flatMap(new FunctionWithNonInferrableReturnType())
 *               .returns(new TypeHint<Tuple2<String, Double>>(){});
 * }</pre>
 *
 * @param typeHint The type hint for the returned data type.
 * @return This operator with the type information corresponding to the given type hint.
 */

它提到了FunctionWithNonInferrableReturnType来说明returns方法的必要性,但是我无法编写这样一个NonInferrableReturnType的类.你能帮忙写一个简单的吗?谢谢!

It mentions FunctionWithNonInferrableReturnType to show case the necessity of returns method, but I am unable to write such a class that is NonInferrableReturnType. Could you please help write a simple one? Thanks!

推荐答案

当文档说 NonInferrableReturnType 时,这意味着我们可以使用类型变量 ,或您喜欢的任何其他字母.因此,您可以创建一个返回 TMapFunction.但是你必须使用 .returns(TypeInformation.of(String.class) 例如,如果你的目标是返回一个 String.

When the docs says NonInferrableReturnType it means that we can use the type variable <T>, or any other letter that you prefer. So you can create a MapFunction that return a T. But then you have to use .returns(TypeInformation.of(String.class) for example, if your goal is to return a String.

public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
    @Override
    public T map(AbstractDataModel value) throws Exception {
        return (T) value.getValue();
    }
}

这里我使用的是您上一个问题的类 创建具有超类型的 MapFunction 时编译失败.没有 .returns(TypeInformation.of(String.class)) 的相同代码编译但抛出运行时异常:

Here I am using the classes of your last question Compiling fails when creating MapFunction with super type . The same code without .returns(TypeInformation.of(String.class)) compiles but throw the runtime exception:

由于类型擦除而无法自动确定.你可以通过使用returns(...)方法给出类型信息提示转换调用的结果,或者让你的函数实现ResultTypeQueryable"接口.

public class NonInferrableReturnTypeStreamJob {

    private final List<AbstractDataModel> abstractDataModelList;
    private final ValenciaSinkFunction sink;

    public NonInferrableReturnTypeStreamJob() {
        this.abstractDataModelList = new ArrayList<AbstractDataModel>();
        this.abstractDataModelList.add(new ConcreteModel("a", "1"));
        this.abstractDataModelList.add(new ConcreteModel("a", "2"));
        this.sink = new ValenciaSinkFunction();
    }

    public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
        this.abstractDataModelList = abstractDataModelList;
        this.sink = sink;
    }

    public static void main(String[] args) throws Exception {
        NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
        concreteModelTest.execute();
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromCollection(this.abstractDataModelList)
                .map(new MyMapFunctionNonInferrableReturnType())
                .returns(TypeInformation.of(String.class))
                .addSink(sink);

        env.execute();
    }
}

如果您愿意,这里是此示例的集成测试:

In case you wish, here is the integration test for this example:

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;

public class NonInferrableReturnTypeStreamJobTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster;
    private final int minAvailableProcessors = 4;
    private final boolean runInParallel;

    public NonInferrableReturnTypeStreamJobTest() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.runInParallel = availableProcessors >= minAvailableProcessors;
        if (this.runInParallel) {
            flinkCluster = new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(minAvailableProcessors)
                            .setNumberTaskManagers(1)
                            .build());
        }
    }

    @Test
    public void execute() throws Exception {
        List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
        abstractDataModelList.add(new ConcreteModel("a", "1"));
        abstractDataModelList.add(new ConcreteModel("a", "2"));
        ValenciaSinkFunction.values.clear();

        NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
        streamJob.execute();

        List<String> results = ValenciaSinkFunction.values;
        assertEquals(2, results.size());
        assertTrue(results.containsAll(Arrays.asList("1", "2")));
    }
}

这篇关于SingleOutputStreamOperator#returns(TypeHint&lt;T&gt; typeHint) 方法的javadoc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-16 02:59