我跟随this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但是我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是“ .withBootstrapServers(options.getKafkaServer())”行。 Beam版本是2.9.0,这是我的代码的一部分。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata()
    )


以下是我运行代码的方式:

mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"

最佳答案

为了通过ValueProvider访问值,您需要使用get方法,然后使用具体类型获取该值。

例如:
有选项时:

ValueProvider<String> getKafkaServer();

您可以通过以下方式访问它:

getKafkaServer().get()这将返回您的String对象。

好像KafkaIo Api需要获取字符串参数而不是ValueProvider一样,您必须从ValueProvider包装器中提取值。

10-02 07:22