我试图为Apache Beam创建一个模板,以将数据索引到elasticsearch。正在创建模板,但是在调用模板时,管道失败,没有协议(protocol)错误。由于错误与URL对象有关,因此看起来很奇怪。

public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public interface IndexToEsOptions extends PipelineOptions {
        @Description("Path of the gzip index file to read from")
        ValueProvider<String> getInputIndexFile();
        void setInputIndexFile(ValueProvider<String> value);

        @Description("Index name to index with")
        ValueProvider<String> getIndexName();
        void setIndexName(ValueProvider<String> value);

        @Description("Index template name")
        ValueProvider<String> getIndexTemplate();
        void setIndexTemplate(ValueProvider<String> value);

        @Description("URI for es")
        @Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
        ValueProvider<String> getEsUri();
        void setEsUri(ValueProvider<String> value);
    }

    public static void main(String[] args) {



        IndexToEsOptions options = PipelineOptionsFactory
                .fromArgs(args).
                        withValidation().as(IndexToEsOptions.class);

        Pipeline p = Pipeline.create(options);
        p.apply(TextIO.read().from(options.getInputIndexFile()))
                .apply(ElasticsearchIO.write().withConnectionConfiguration(
                        ElasticsearchIO.ConnectionConfiguration.create(
                                new String[]{options.getEsUri().toString()},
                                options.getIndexName().toString(),
                                options.getIndexTemplate().toString())
                                .withConnectTimeout(240)
                        )
                        .withMaxBatchSizeBytes(15 * 1024 * 1024)
                );

        p.run();
    }
我运行时遇到的错误是

最佳答案

简而言之,不,它看起来不像ElasticsearchIO.ConnectionConfiguration支持ValueProviders,至少从当前版本(2.22.0)开始不支持。您可以通过查看 ConnectionConfiguration.Create 的签名来查看:

public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
                                                             java.lang.String index,
                                                             java.lang.String type)
并将其与不支持ValueProviders的函数 ElasticsearchIO.Read.withQuery 进行比较:
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
为了支持ValueProvider,函数实际上必须接受ValueProvider对象。这是因为ValueProvider旨在在运行时而不是在管道构建期间传递参数。因此,在管道构建期间,必须将其作为ValueProvider对象传递给所有地方。
在您的示例中,正在发生的事情是您在ValueProvider上为toString调用EsUri,而不是生成包含您的URL的字符串,而是获得了ValueProvider的字符串表示形式,如下所示:"RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}。这就是为什么要获取MalformedURLException的原因。它正在尝试将该字符串读取为失败的URL。
但是,解决方案很简单,只需将EsUri参数从ValueProvider<String>更改为String,即可将其更改为构造时间参数。请注意,将其用作构造时间参数意味着您每次要更改该参数时都需要重建管道。不幸的是,在添加ValueProvider支持之前,您将无能为力。

07-26 08:16